/**
 * Copyright 2019 吉鼎科技.

 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cn.easyplatform.support.transform.handler;

import cn.easyplatform.EasyPlatformWithLabelKeyException;
import cn.easyplatform.ScriptEvalException;
import cn.easyplatform.ScriptEvalExitException;
import cn.easyplatform.ScriptRuntimeException;
import cn.easyplatform.contexts.Contexts;
import cn.easyplatform.contexts.RecordContext;
import cn.easyplatform.contexts.WorkflowContext;
import cn.easyplatform.dos.FieldDo;
import cn.easyplatform.dos.Record;
import cn.easyplatform.entities.helper.EventLogic;
import cn.easyplatform.i18n.I18N;
import cn.easyplatform.interceptor.CommandContext;
import cn.easyplatform.log.LogManager;
import cn.easyplatform.messages.vos.executor.BeginVo;
import cn.easyplatform.messages.vos.executor.EndVo;
import cn.easyplatform.messages.vos.executor.ErrorVo;
import cn.easyplatform.messages.vos.executor.ProgressVo;
import cn.easyplatform.spi.listener.event.MessageEvent;
import cn.easyplatform.support.scripting.CompliableScriptEngine;
import cn.easyplatform.support.scripting.ScriptEngineFactory;
import cn.easyplatform.support.transform.Parameter;
import cn.easyplatform.support.transform.TransformerHandler;
import cn.easyplatform.type.FieldType;
import cn.easyplatform.util.MessageUtils;
import cn.easyplatform.util.RuntimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author <a href="mailto:davidchen@epclouds.com">littleDog</a> <br/>
 * @since 2.0.0 <br/>
 */
public class FlatingHandler implements
        TransformerHandler<Map<String, List<List<FieldDo>>>> {

    private final static Logger LOG = LoggerFactory.getLogger(FlatingHandler.class);

    private CommandContext cc;

    private RecordContext rc;

    private int firstRowNum, firstColNum, lastRowNum, lastColNum;

    private EventLogic el;

    private AtomicInteger successCount, failCount, rowIndex;

    private AtomicBoolean runFlag;


    public FlatingHandler(CommandContext cc, RecordContext rc, String logic,
                          int firstRowNum, int firstColNum, int lastRowNum, int lastColNum) {
        this.cc = cc;
        this.rc = rc;
        this.firstRowNum = firstRowNum;
        this.firstColNum = firstColNum;
        this.lastRowNum = lastRowNum;
        this.lastColNum = lastColNum;
        el = RuntimeUtils.castTo(cc, rc, logic);
    }

    @Override
    public void transform(Map<String, List<List<FieldDo>>> data, Parameter parameter) {
        if (firstRowNum > 0)
            firstRowNum--;
        if (firstColNum > 0)
            firstColNum--;
        successCount = new AtomicInteger(0);
        failCount = new AtomicInteger(0);
        runFlag = new AtomicBoolean(true);
        if (parameter != null) {
            if (parameter.isProgress()) {
                BeginVo bv = new BeginVo((I18N.getLabel("upload.execute.logic")));
                int total = 0;
                for (List<List<FieldDo>> list : data.values())
                    total += list.size() - firstRowNum;
                bv.setTotal(total);
                cc.send(new MessageEvent(cc.getWorkflowContext().getId(), bv));
                rowIndex = new AtomicInteger(0);
            }
            if (parameter.getCountOfThread() > 0)
                asyncProcess(data, parameter);
            else
                syncProcess(data, parameter);
        } else {
            syncProcess(data, parameter);
        }
        rc.setVariable(new FieldDo("SUCCESS_COUNT", FieldType.INT,
                successCount.get()));
        rc.setVariable(new FieldDo("FAIL_COUNT", FieldType.INT,
                failCount.get()));
        if (parameter != null && parameter.isProgress() && runFlag.get())
            cc.send(new MessageEvent(cc.getWorkflowContext().getId(), new EndVo(new Object[]{successCount.get(), failCount.get()})));
    }

    /**
     * 同步执行
     *
     * @param data
     * @param parameter
     */
    private void syncProcess(Map<String, List<List<FieldDo>>> data, Parameter parameter) {
        CompliableScriptEngine compiler = null;
        try {
            compiler = ScriptEngineFactory.createCompilableEngine(cc,
                    el.getContent());
            WorkflowContext ctx = cc.getWorkflowContext();
            for (Map.Entry<String, List<List<FieldDo>>> entry : data.entrySet()) {
                List<List<FieldDo>> records = entry.getValue();
                int rowNums = records.size();
                if (lastRowNum > 0 && rowNums > lastRowNum)
                    rowNums = lastRowNum;
                rc.setVariable(new FieldDo("SHEET_NAME", FieldType.VARCHAR,
                        entry.getKey()));
                process(ctx, compiler, firstRowNum, rowNums, records);
            }
        } finally {
            compiler.destroy();
        }
    }

    private void asyncProcess(Map<String, List<List<FieldDo>>> data, Parameter parameter) {
        WorkflowContext ctx = cc.getWorkflowContext();
        for (Map.Entry<String, List<List<FieldDo>>> entry : data.entrySet()) {
            List<List<FieldDo>> records = entry.getValue();
            int rowNums = records.size();
            if (lastRowNum > 0 && rowNums > lastRowNum)
                rowNums = lastRowNum;
            rc.setVariable(new FieldDo("SHEET_NAME", FieldType.VARCHAR,
                    entry.getKey()));
            int num = (rowNums - firstRowNum) / parameter.getCountOfThread();
            if (num > 50) {//至少50条以上才启多线程
                if (firstRowNum > 0 && rowNums > firstRowNum)
                    records = records.subList(firstRowNum, rowNums);
                List<List<List<FieldDo>>> penddingList = new ArrayList<>();
                while (true) {
                    if (records.size() > num) {
                        List<List<FieldDo>> tmp = records.subList(0, num);
                        penddingList.add(tmp);
                        records = records.subList(num, records.size());
                    } else {
                        if (!records.isEmpty())
                            penddingList.add(records);
                        break;
                    }
                }
                CountDownLatch latch = new CountDownLatch(penddingList.size());
                for (int i = 0; i < penddingList.size(); i++) {
                    List<List<FieldDo>> list = penddingList.get(i);
                    new Executor("EP Import thread " + i, latch, list).start();
                }
                try {
                    latch.await();
                } catch (InterruptedException e) {
                    if (LOG.isErrorEnabled())
                        LOG.error("CountDownLatch", e);
                }
            } else {
                CompliableScriptEngine compiler = null;
                try {
                    compiler = ScriptEngineFactory.createCompilableEngine(cc,
                            el.getContent());
                    process(ctx, compiler, firstRowNum, rowNums, records);
                } finally {
                    compiler.destroy();
                }
            }
        }
    }

    private void process(WorkflowContext ctx, CompliableScriptEngine compiler, int first, int size, List<List<FieldDo>> records) {
        for (int row = first; row < size; row++) {
            if (!runFlag.get())
                break;
            if (rowIndex != null && rowIndex.incrementAndGet() % 10 == 0)
                cc.send(new MessageEvent(ctx.getId(),
                        new ProgressVo(rowIndex.get())));
            List<FieldDo> objs = records.get(row);
            RecordContext record = rc.copy();
            record.setData(new Record());
            record.setParameter("759", "IMPORT");
            int colNums = objs.size();
            if (lastColNum > 0 && colNums > lastColNum)
                colNums = lastColNum;
            for (int col = firstColNum; col < colNums; col++)
                record.getData().set(objs.get(col));
            try {
                compiler.eval(record, record);
                successCount.incrementAndGet();
            } catch (ScriptEvalExitException ex) {
                if (!record.getParameterAsBoolean("855"))// 如果855不为真，则直接退出
                    throw new ScriptRuntimeException(ex.getMessage(),
                            cc.getMessage(ex.getMessage(),
                                    ex.getRecord()));
                failCount.incrementAndGet();
                continue;
            } catch (ScriptEvalException ex) {
                ex.setSource(row + ":" + el.toString());
                throw ex;
            } catch (EasyPlatformWithLabelKeyException ex) {
                ex.setSource(row + ":" + el.toString());
                throw ex;
            }
        }
    }

    private class Executor extends Thread {

        private CountDownLatch latch;
        private List<List<FieldDo>> data;

        public Executor(String name, CountDownLatch latch, List<List<FieldDo>> data) {
            super(name);
            this.latch = latch;
            this.data = data;
        }

        @Override
        public void run() {
            Contexts.set(CommandContext.class, cc);
            WorkflowContext ctx = cc.getWorkflowContext();
            LogManager.beginRequest(cc.getEnv().getId(), cc.getUser());
            CompliableScriptEngine compiler = null;
            try {
                compiler = ScriptEngineFactory.createCompilableEngine(cc,
                        el.getContent());
                process(ctx, compiler, 0, data.size(), data);
            } catch (Exception e) {
                if (LOG.isErrorEnabled())
                    LOG.error("process", e);
                runFlag.set(false);
                cc.send(new MessageEvent(ctx.getId(), new ErrorVo(MessageUtils.getMessage(e))));
            } finally {
                latch.countDown();
                compiler.destroy();
                Contexts.clear();
            }
        }
    }

}
